查看原文
其他

干货 | 携程基于Quasar协程的NIO实践

Ryan 携程技术 2022-05-25

作者简介

 

Ryan,携程Java开发工程师,对高并发、网络编程等领域有浓厚兴趣。


IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题。目前Java项目对接NIO的方式主要依靠回调,代码复杂度高,降低了代码可读性与可维护性。近年来Golang、Kotlin等语言的协程(Coroutine)能达到高性能与可读性的兼顾。

本文利用开源的Quasar框架提供的协程对系统进行NIO改造,解决以下两个问题:

1)提升单机任务的吞吐量,保证业务请求突增时系统的可伸缩性。
2)使用更轻量的协程同步等待IO,替代处理NIO常用的异步回调。
 

一、Java异步编程与非阻塞IO



本文改造的系统处理来自前台的任务,通过HTTP请求对端服务,还通过RPC调用内部服务。当业务高峰时,系统会遇到瞬时并发任务量数十倍激增的情况,系统的线程数量急剧增加造成性能下降。为此,不得不扩容以保证业务高峰时期的性能。
  

                         
基于epoll的NIO框架Netty在一些框架级别的应用中已经得到了广泛使用,但在快速迭代的业务系统中的应用依然有一定的局限性。NIO 消除了线程的同步阻塞,意味着只能异步处理IO的结果,这与业务开发者顺序化的思维模式有一定差异。当业务逻辑复杂以及出现多次远程调用的情况下,多级回调难以实现和维护。
 

1.1 Java中的异步工具


Java项目大多使用JDK8,除线程外可以获得的异步的编程支持包括CompletableFuture,以及开源的RxJava、Vert.x等反应式编程框架等。这些工具使用了基于响应式编程的链式调用逐级传递事件,未从根本解决回调问题。

如下为将一段简单的逻辑判断使用CompletableFuture进行异步改造后的对比。原始版本使用getA方法获得第一步的请求结果,根据其相应选择使用getB1还是getB2获取第二步的响应作为结果。

HttpResponse a = getA();
HttpResponse b ;if(a.getBody().equals("1")){ b=getB1();}else{ b=getB2();}
String ans=b.getBody();

首先将三个获取响应的方法改为异步。此处假设getB1与getB2内部已经具有复杂逻辑,且不属于同一领域,不适合合并为一个方法。

private CompletableFuture<HttpResponse> getA();private CompletableFuture<HttpResponse> getB1();private CompletableFuture<HttpResponse> getB2();


然后使用CompletableFuture的链式调用,将两个步骤组合起来:

String ans = getA() .thenCompose(a -> { if (a.getBody().equals("1")) { return getB1(); } else { return getB2(); } }).get() .getBody();

使用CompletableFuture的链式回调后,代码变得不友好。RxJava等框架同样具有这个问题。这类反应式的编程工具更适合于数据流的传递。对于if/else、switch/case,乃至while/for、break/continue这类过程控制语句,实现与维护的难度都很大。业务系统需要类似于线程的同步等待,同时具有低资源消耗的编码工具,配合 NIO使用。当时使用NIO时,由于可以不占用线程,可以使用一种资源消耗更小的协程来等待。

1.2 协程


协程是一种进程自身来调度任务的调度模式。协程与线程不同之处在于,线程由内核调度,而协程的调度是进程自身完成的。协程只是一种抽象,最终的执行者是线程,每个线程只能同时执行一个协程,但大量的协程可以只拥有少量几个线程执行者,协程的调度器负责决定当前线程在执行那个协程,其余协程处于休眠并被调度器保存在内存中。

和线程类似,协程挂起时需要记录栈信息,以及方法执行的位置,这些信息会被协程调度器保存。协程从挂起到重新被执行不需要执行重量级的内核调用,而是直接将状态信息还原到执行线程的栈,高并发场景下,协程极大地避免了切换线程的开销。下图展示了协程调度器内部任务的流转。



协程中调用的方法是可以挂起的。不同于线程的阻塞会使线程休眠,协程在等待异步任务的结果时,会通知调度器将自己放入挂起队列,释放占用的线程以处理其他的协程。异步任务完毕后,通过回调将异步结果告知协程,并通知调度器将协程重新加入就绪队列执行。

1.3 Quasar任务调度原理


Quasar(https://github.com/puniverse/quasar是一个开源的Java协程框架,通过利用Java instrument技术对字节码进行修改,使方法挂起前后可以保存和恢复JVM栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置。以如下方法为例,该方法分为两步,第一步为initial初始化,第二部为通过NIO获取网络响应。

public String instrumentDemo(){ initial(); String ans = getFromNIO(); return ans;}


Quasar会在initial前增加一个flag字段,表明当前方法执行的位置。第一次执行方法时,检查到flag为0,修改flag为1并继续往下执行initial方法。执行getFromNIO方法前插入字节码指令将栈帧中的数据全部保存在一个Quasar自定义的栈结构中,在执行getFromNIO后,挂起协程,让出线程资源。直至NIO异步完成后,协程调度器将第二次执行该方法,检测到flag为1,将会调用jump指令跳转到returnans语句前,并将保存的栈结构还原到当前栈中,最后调用人return ans语句,方法执行完毕。

二、系统异步IO改造



在项目中添加Quasar依赖后,可以使用Fiber类新建协程。建立的方法与线程类似。

new Fiber(()->{ //方法体}).start();

2.1 整合Netty与Quasar


系统使用的Http框架是基于Netty的async-http-client(https://github.com/AsyncHttpClient/async-http-client),该框架提供了异步回调和CompletableFuture两种对响应的异步处理方式。

CompletableFuture自JDK8推出,与之前的Future类最大的不同在于,提供了异步任务跨线程的通知和控制机制。即,任务的等待者可以在CompletableFuture注册任务完成或异常时的回调,而执行者也可以通过它通知等待者。Quaasr框架对它也做了支持,提供了API用于在协程中等待CompletableFuture的结果。调用后,协程将挂起,直至future状态为已完成。

AsyncCompletionStage.get(future)

通过CompletableFuture作为通知中介,我们可以将AsyncHttpClient与Quasar做整合,挂起协程等待IO结果。

//创建HttpClientAsyncHttpClient httpClient = Dsl.asyncHttpClient();//创建请求Request request = createRequest();//将网络请求交给HttpClient执行CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture();//通过Quasar挂起协程Response response = AsyncCompletionStage.get(future);//获取网络结果后,通过future传递response并唤醒协程重新执行deal(response);

过程可由下图表示。


Quasar框架AsyncCompletionStage.get内部完成的工作相当于,在HttpClient返回的future上注册回调,回调的内容是“IO操作完成后通知调度器唤醒协程”,这样将NIO异步回调全部操作封装在协程调度器中,用户代码看起来是同步等待的形式,避免了自行实现回调处理带来的繁琐,解决了前文所述的回调地狱。

 

2.2 声明挂起方法


Quasar需要织入字节码接管挂起方法的调度,在项目主pom下添加quasar-maven-plugin插件,该插件将在编译后的class文件中修改字节码。

<plugin> <groupId>com.vlkan</groupId> <artifactId>quasar-maven-plugin</artifactId> <version>0.7.9</version> <executions> <execution> <goals> <goal>instrument</goal> </goals> </execution> </executions></plugin>

Quasar通过识别方法是否抛出了该框架定义的SuspendExecution异常决定是否修改字节码。Quasar框架在AsyncCompletionStage.get方法上声明了SuspendExceution异常,该异常是捕获异常,但仅作为识别挂起方法的声明,在运行时不会实际抛出。使用者必须逐层抛出该异常直至新建协程的一层。当方法内部存在try/catch语句时,也必须抛出该异常。

public void startFiber() throws ExecutionException, InterruptedException { Fiber<Void> fiber = new Fiber<Void>(() -> { //不用继续抛出异常 Response response = waitNextLayer1(); deal(response); }).start();}
private Response waitNextLayer1() throws SuspendExecution { return waitNextLayer2();}
private Response waitNextLayer2() throws SuspendExecution { CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture(); try { // Quasar框架工具类抛出SuspendExecution return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}

2.3 异步RPC调用


目前主流的RPC框架都基于NIO实现,支持异步回调,有的RPC框架已经直接提供了返回CompletableFuture或ListenableFuture(Guava工具类提供)的异步接口,通过使用ComplatableFuture,可以按前文类似的方法将Quasar与RPC框架结合起来。当RPC框架没有该返回类型时,一般会提供如下类似的带泛型的异步回调接口:

interface Callback<TResponse> { void callback(TResponse TResponse, Exception e);}

这种情况,可以使用者自己创建ComplatableFuture,在回调中设置其状态,并调用AsyncCompletionStage.get等待这个future。

CompletableFuture<Response> future=new CompletableFuture<>();//调用hello接口的异步APInew RpcClient().helloAsync(request, new Callback<Response>() { public void callback(Response response, Exception e) { if (e == null) future.complete(response); else future.completeExceptionally(e); }});//在此处调用Quasar的API,挂起直至RPC调用完成Response response = AsyncCompletionStage.get(future);

上述代码依然具有异步回调不直观的缺点,通过JDK8的函数式接口可以实现一个通用的调用模板,将异步回调变为同步等待的形式。

@FunctionalInterfaceprivate interface RpcAsyncCall<TRequest, TResponse> { void request(TRequest request, Callback<TResponse> callback);}public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution { CompletableFuture<TResponse> future = new CompletableFuture<>();
call.request(request, (response, e) -> { if (e == null) future.complete(response); else future.completeExceptionally(e); });
try { //使用Quasar等待Future结果 return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}

最后的调用可简化一行代码,该方法适用于所有该Rpc框架提供的异步接口。

Response response= waitRpc(new RpcClient()::helloAsync, request);

2.4 阻塞操作的处理


Quasar协程使用的时候有一定的限制,由于调度器线程池大小固定,在协程中不能阻塞线程,执行线程将被占用。对于某些暂时只能依靠阻塞IO的调用,如数据库,消息队列等,无法使用协程等待其结果,当这些阻塞操作量不大的情况下,可使用另一个可伸缩的线程池等待结果,避免对协程调度器的影响。

public void waitBlocking() throws SuspendExecution { //从DB获取结果 String ans = waitBlocking(this::selectFromDB);}
private ExecutorService threadPool = Executors.newCachedThreadPool();
private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution { CompletableFuture<T> future = new CompletableFuture<>(); threadPool.submit(() -> { T ans = supplier.get(); future.complete(ans); });
try { return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}

2.5 并发工具的使用


协程对并发锁的使用有比较大的限制,需要使用者理解线程锁与协程的调度机制。在synchronized同步块的内部,不能包含挂起协程的语句。当持有锁的协程挂起后会让出线程资源,由于锁的可重入性,另一个运行在同一个线程上的协程再加锁时同样会成功。另一方面,协程挂起后恢复执行时,也可能会在另一个线程上运行。出现两个线程操作共享资源的异常。同时未持有锁的线程释放时,会出现IllegalMonitorStateException异常。



但如果同步块的内部没有挂起协程的语句,则线程锁的机制仍然有效。线程的在执行过程中可能切换,而协程的调度在每个执行线程上是串行的,协程持有的锁在不包含挂起操作时,会在占用线程执行完毕直到退出同步块为止,不会发生锁失效的情况。

JDK并发包中的工具可分为两类,一类是Lock、Semaphore、CountDownLatch等具有线程可重入性的工具,不能在未释放资源前使用挂起协程的操作,而另一类则是原子变量、并发容器等不会让出线程的工具,仍可正常使用,但要注意高并发的情况下锁的性能。此外,在使用并发工具的阻塞方法,如await时,可能导致协程的执行线程中发生阻塞。
 

三、总结



系统运行在4核心的主机上,线程池构成如下。



业务逻辑运行在Quasar的协程调度线程池中,线程池大小为CPU核数。HTTP请求与RPC调用均通过内部的NIO线程池管理。此外定义了一个core size为8的可伸缩的线程池用于少量消息队列、DB等阻塞IO的操作。其余的线程是系统中引入的其他组件所新建的线程,正常情况下不会成为系统性能的瓶颈。

改造后,在业务高峰流量激增数十倍的情况下线程数量依然稳定,而CPU利用率也从平均5%以下提升至10%-60%,在瞬时与高峰流量下能保持稳定。集群CPU核数在保留一定的业务冗余以应对业务高峰的情况下,缩减至1/5。
 

3.1 限制与风险


Quasar协程不是Java的语言标准,没有JVM层面的支持,使用时必须手动抛出异常声明每一个挂起方法,对代码有一定的侵入性。使用不当时,可能出现异常。

代码的try/catch时可能同时捕获SuspendExecution异常,从而忘记标记方法,此方法字节码不会被修改,结合Quasar的原理不难看出,当没有织入字节码时,挂起方法恢复执行,无法还原方法栈帧和执行状态,将会出现语句被重复执行、空指针等错误。运行时空指针、死循环的症状,排查的重点是是否漏加SuspendExecution标记。

在新线程而不是新协程中使用挂起方法时,会出现同样的问题。Thread的构造方法中传入的是Runnable接口对象,其run方法没有声明SuspendExecution异常,run内部的语句不会被织入字节码,造成上述异常。
 

3.2 总结与展望


协程使得NIO能够更好地应用在Java中,比回调方法更易读易维护。对系统的改造集中在底层通信封装和对方法的标记上,业务逻辑无需修改。虽然具有一定的代码侵入性和理解成本,但这种学习成本能逐渐被代码的可维护性优势抵消。

异步编程最佳的实现方式是:“Codes Like Sync,Works Like Async”,即以同步的方式编码,达到异步的效果与性能,兼顾可维护性与可伸缩性。OpenJDK 在2018年创建了Loom 项目(https://wiki.openjdk.java.net/display/loom),目标是在JVM上实现轻量级的线程,并解除JVM线程与内核线程的映射。相信会给Java生态带来巨大的改变。

【推荐阅读】


“携程技术”公众号后台回复“新书”

免费获得两本书的试读样章~

《携程架构实践》

京东

当当

《携程人工智能实践》

京东

当当



 “携程技术”公众号

  分享,交流,成长



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存